package com.burt.zookeeper.curator.queue.fifo;

import com.burt.zookeeper.curator.queue.CommonHelper;
import com.burt.zookeeper.curator.queue.CustomDistributedQueue;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;

/**
 * Description: 分布式队列，fifo队列的实现
 * User: Burt
 * Date: 2017-08-22
 * Time: 8:58
 */
public class DistributedFifoQueue<T> implements CustomDistributedQueue<T> {

    private CuratorFramework curatorFramework;

    //根节点
    private final String root;

    private String NODE_NAME = "queue_fifo";

    public DistributedFifoQueue(CuratorFramework curatorFramework, String root) {
        this.curatorFramework = curatorFramework;
        this.root = root;
    }

    /**
     * 向队列提供数据
     * @param element
     * @return
     */
    @Override
    public boolean offer(T element){

        //构建数据节点的完整路径
        String nodeFullPath = root.concat("/").concat(NODE_NAME);

        try {
            curatorFramework.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(nodeFullPath,CommonHelper.Object2Byte(element));
        } catch (Exception e) {
            e.printStackTrace();
            try {
                curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath(root);
                offer(element);
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        }

        return true;
    }

    /**
     * 从队列中获取数据
     * @return
     */
    @Override
    public T poll(){

        try {
            List<String> childrenNodes = curatorFramework.getChildren().forPath(root);
            if (childrenNodes.isEmpty()){
                System.out.println("fifo队列中还没有数据");
                return null;
            }

            //从小到大排序
            SortedSet<String> childrenSet = new TreeSet<String>();
            for (String node : childrenNodes) {
                //因为create成功返回的是全路径
                childrenSet.add(root.concat("/").concat(node));
            }

            for (String nodeName:childrenSet) {
                T node = (T) CommonHelper.Byte2Object(curatorFramework.getData().forPath(nodeName));
                curatorFramework.delete().forPath(nodeName);
                //TODO 这里貌似不需要这样遍历，直接返回最小序号节点的数据就行
                return node;
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

        return null;
    }

    /**
     * 获取队列的大小
     * @return
     * @throws Exception
     */
    public int getSize() throws Exception {
        return curatorFramework.getChildren().forPath(root).size();
    }

    /**
     * 判断队列是否为空
     * @return
     * @throws Exception
     */
    public boolean isEmpty() throws Exception {
        return curatorFramework.getChildren().forPath(root).isEmpty();
    }

}
